/**
 * 
 */
package com.fnic.pearl.scheduler.dao;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.log4j.Logger;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.exceptions.JedisConnectionException;

import com.fnic.pearl.scheduler.constant.SchedulerRedisKey;
import com.fnic.pearl.scheduler.constant.TestTaskStatus;
import com.fnic.pearl.scheduler.mod.TestTaskSchedulerRuler;
import com.fnic.pearl.scheduler.model.HeartBeatResp;
import com.fnic.pearl.scheduler.model.ProbeTaskStatus;
import com.fnic.pearl.scheduler.model.TaskStats;
import com.fnic.pearl.scheduler.model.TestTask;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

/**
 * 负责所有需要和 redis/db 存储的交互
 * 
 * @author HuHaiyang
 * @date 2013年7月21日
 */
public class SchedulerStore
{
    private static Logger LOG = Logger.getLogger(SchedulerStore.class);
    
    /**
     * singleton access
     */
    private static SchedulerStore instance = new SchedulerStore();
    public static SchedulerStore getInstance()
    {
        return instance;
    }
    
    /**
	 * 存储各探针上的需通过心跳响应返回的非测量任务的消息队列
	 * 
	 * key: probeId value: 为 FIFO 的心跳响应列表
	 */
    private Map<Integer, List<HeartBeatResp>> probeHbRespQueue;
    private ReentrantLock lockHbResp = new ReentrantLock();
    
    /**
	 * 测试任务队列，存放所有的任务信息 key 为任务ID，value 为任务详情 在任务执行完后，也需要从此队列中清除该任务的记录
	 */
    private ConcurrentHashMap<Long, TestTask> taskQueue = null;
    
    /**
	 * S_NEW_TASK 初始任务 状态的任务主要由 SchedulerHandler 进行调度处理 该队列采用单向阻塞队列进行处理
	 */
    private List<TestTask> initTaskQueue = new ArrayList<TestTask>();
    private ReentrantLock lockInit = new ReentrantLock();
    private Condition condInitHas = lockInit.newCondition();
    
    /**
	 * S_WISSUE
	 * 
	 * 主要在接收到探针的心跳消息后，阻塞获取待执行的测量任务 wissue 中为了避免多个探针同步阻塞之间的问题 采用一个探针对应一个锁
	 */
    private Map<Integer, LinkedBlockingDeque<TestTask>> wissueTaskQueue 
        = new HashMap<Integer, LinkedBlockingDeque<TestTask>>();
    private Map<Integer, ReentrantLock> locksWissue = new HashMap<Integer, ReentrantLock>();
    private Map<Integer, Condition> condsWissueHas = new HashMap<Integer, Condition>();
    
    /**
	 * 各个状态的测量任务队列
	 * 
	 * - key 为 probeId - value 为任务队列
	 */
    
    // S_ISSUED
    private Map<Integer, Map<Long, TestTask>> issuedTaskQueue 
        = new HashMap<Integer, Map<Long, TestTask>>();
    private final ReentrantLock lockIssued = new ReentrantLock();
    
    // S_RUNNING
    private Map<Integer, Map<Long, TestTask>> runningTaskQueue 
        = new HashMap<Integer, Map<Long, TestTask>>();
    private final ReentrantLock lockRunning = new ReentrantLock();
    
    // S_EXEC_SUCC
    private Map<Integer, Map<Long, TestTask>> execSuccTaskQueue
        = new HashMap<Integer, Map<Long, TestTask>>();
    private final ReentrantLock lockExecSucc = new ReentrantLock();
    
    // S_ISSUE_ES_SUCC
    private Map<Integer, Map<Long, TestTask>> issueEsSuccTaskQueue
        = new HashMap<Integer, Map<Long, TestTask>>();
    private final ReentrantLock lockIssueEsSucc = new ReentrantLock();
    
    // S_ISSUE_REDIS_SUCC
    private Map<Integer, Map<Long, TestTask>> issueRedisSuccTaskQueue
    	= new HashMap<Integer, Map<Long, TestTask>>();
    private final ReentrantLock lockIssueRedisSucc = new ReentrantLock();
    
    /**
	 * S_FAILED_TASK_HANGUP
	 * 
	 * 挂起状态的任务在相应的探针恢复后，回退至之前的状态
	 */
    private Map<Integer, List<TestTask>> hangupTaskQueue
        = new HashMap<Integer, List<TestTask>>();
    private final ReentrantLock lockHangup = new ReentrantLock();
    
    /**
	 * 所有最终失败状态(即到此状态，则一次执行流程结束)的队列，包括如下:
	 * 
	 * - S_FAILED_TASK_EXEC - S_FAILED_TASK_PARSE - S_FAILED_RESULT_ISSUE -
	 * S_FAILED_TIMEOUT - S_FAILED_PROBE_LOGOUT
	 * 
	 * 这里并不包括 HangUp 状态，因为 hangup 状态不是一个最终的失败状态
	 */
    private Map<Integer, Map<Long, TestTask>> failedTaskQueue
        = new HashMap<Integer, Map<Long, TestTask>>();
    private final ReentrantLock lockFailed = new ReentrantLock();
        
    /**
	 * 测量任务状态队列 主要由 测量任务上报线程 进行访问
	 */
    private List<ProbeTaskStatus> taskStatusQueue = new ArrayList<ProbeTaskStatus>();
    private final ReentrantLock lockTSQ = new ReentrantLock();
    private Condition condTSQ = lockTSQ.newCondition();
    
    /**
	 * 测试任务统计
	 */
    private TaskStats stats = TaskStats.getInstance();
    
    /**
     * @return the stats
     */
    public TaskStats getStats()
    {
        return stats;
    }

    /**
	 * 是否需要与 redis 同步数据
	 */
    private boolean redisEnable = true;
    
    /**
	 * 初始化工作，如：redis pool 的创建
	 */
    public boolean init(boolean redisEnable,
            String redisMasterIp, 
			int redisMasterPort,
 String redisSlaveIp, int redisSlavePort,
            int redisMasterTimeout)
    {
        this.redisEnable = redisEnable;
        if (redisEnable)
        {
			// TODO 设置 master/slave 特性
            redisPool = new JedisPool(new JedisPoolConfig(), 
                    redisMasterIp, redisMasterPort, redisMasterTimeout*1000);
        }
        
        taskQueue = new ConcurrentHashMap<Long, TestTask>();
        probeHbRespQueue = new HashMap<Integer, List<HeartBeatResp>>();
        
		// 启动 测量任务超时检测器
        new Timer().schedule(new TestTaskTimeoutCheck(), 1000, 1000);
        
        return true;
    }
    
    /**
	 * 结束退出的工作
	 */
    public void destory()
    {
        if (redisEnable && redisPool != null)
        {
            redisPool.destroy();
        }
    }
    
    /**
	 * 任务超时检测线程， 这里采用每隔1秒的定时器线程来避免针对每个任务启动定时器， 而造成系统开销的问题
	 * 
	 * 当前只会检测上一个状态为 HANGUP 或者 当前状态为 HANGUP 的任务 且 状态没有变化的时长为 5 分钟的任务会重置为 TIMEOUT
	 * 状态
	 */
    class TestTaskTimeoutCheck extends TimerTask
    {
		// 任务超时时长，单位: ms
        public static final long TASK_TIMEOUT = 5*60*1000;
        
        @Override
        public void run()
        {
            final long now = System.currentTimeMillis();
            
            Iterator<Long> iterTaskId = taskQueue.keySet().iterator();
            while (iterTaskId.hasNext())
            {
                long taskId = iterTaskId.next();
                TestTask tt = taskQueue.get(taskId);
                if (tt != null 
                        && (tt.getLastStatus() == TestTaskStatus.S_FAILED_TASK_HANGUP
                                || tt.getStatus() == TestTaskStatus.S_FAILED_TASK_HANGUP)
                        && (now - tt.getLastStatusTime()) > TASK_TIMEOUT)
                {
                    updateTestTaskStatus(tt, TestTaskStatus.S_FAILED_TIMEOUT);
                    putSFHTestTask(tt);
                    
                    ProbeTaskStatus pts = new ProbeTaskStatus(tt.getProbe(), 
                            tt.getTaskId(), TestTaskStatus.S_FAILED_TIMEOUT);
                    pts.setExecNum(tt.getExecNum());
                    SchedulerStore.getInstance().putTestTaskStatus(pts);
                }
            }
        }
    }
    
    /**
	 * 切换退出探针上的任务状态为 LOGOUT，可切换的状态有：
	 * 
	 * - S_ISSUED - S_RUNNING - S_EXEC_SUCC
	 */
    public void switchLogoutTask(int probeId)
    {
        // issued
        if (issuedTaskQueue.containsKey(probeId))
        {
            Map<Long, TestTask> tasks = issuedTaskQueue.get(probeId);
            Iterator<Long> iter = tasks.keySet().iterator();
            while (iter.hasNext())
            {
                long taskId = iter.next();
                TestTask tt = tasks.get(taskId);
                if (tt != null
                        && tt.getStatus() == TestTaskStatus.S_ISSUED)
                {
                    updateTestTaskStatus(tt, TestTaskStatus.S_FAILED_PROBE_LOGOUT);
                    
                    ProbeTaskStatus pts = new ProbeTaskStatus(tt.getProbe(), 
                            tt.getTaskId(), TestTaskStatus.S_FAILED_PROBE_LOGOUT);
                    pts.setExecNum(tt.getExecNum());
                    putTestTaskStatus(pts);
                }
            }
        }
        
        // running
        if (runningTaskQueue.containsKey(probeId))
        {
            Map<Long, TestTask> tasks = runningTaskQueue.get(probeId);
            Iterator<Long> iter = tasks.keySet().iterator();
            while (iter.hasNext())
            {
                long taskId = iter.next();
                TestTask tt = tasks.get(taskId);
                if (tt != null
                        && tt.getStatus() == TestTaskStatus.S_RUNNING)
                {
                    updateTestTaskStatus(tt, TestTaskStatus.S_FAILED_PROBE_LOGOUT);
                    
                    ProbeTaskStatus pts = new ProbeTaskStatus(tt.getProbe(), 
                            tt.getTaskId(), TestTaskStatus.S_FAILED_PROBE_LOGOUT);
                    pts.setExecNum(tt.getExecNum());
                    putTestTaskStatus(pts);
                }
            }
        }
        
        // exec succ
        if (execSuccTaskQueue.containsKey(probeId))
        {
            Map<Long, TestTask> tasks = execSuccTaskQueue.get(probeId);
            Iterator<Long> iter = tasks.keySet().iterator();
            while (iter.hasNext())
            {
                long taskId = iter.next();
                TestTask tt = tasks.get(taskId);
                if (tt != null
                        && tt.getStatus() == TestTaskStatus.S_EXEC_SUCC)
                {
                    updateTestTaskStatus(tt, TestTaskStatus.S_FAILED_PROBE_LOGOUT);
                    
                    ProbeTaskStatus pts = new ProbeTaskStatus(tt.getProbe(), 
                            tt.getTaskId(), TestTaskStatus.S_FAILED_PROBE_LOGOUT);
                    pts.setExecNum(tt.getExecNum());
                    putTestTaskStatus(pts);
                }
            }
        }
    }
    
    //-------- heartbeat response ---------
    
    /**
	 * 新增一条心跳响应记录
	 * 
	 * @param probeId
	 *            probe id
	 * @param hbr
	 *            heart beat response record
	 * @return true when add success, false when failed
	 */
    public boolean pushHbResp(int probeId, HeartBeatResp hbr)
    {
        lockHbResp.lock();
        List<HeartBeatResp> lstHBR = probeHbRespQueue.get(probeId);
        if (lstHBR == null)
        {
            lstHBR = new ArrayList<HeartBeatResp>();
            probeHbRespQueue.put(probeId, lstHBR);
        }
        lstHBR.add(hbr);
        lockHbResp.unlock();
        return true;
    }
    
    /**
	 * 从指定探针的心跳响应队列顶部弹出一条记录
	 * 
	 * @param probeId
	 *            probe id
	 * @return true when success, otherwise return false
	 */
    public HeartBeatResp popHbResp(int probeId)
    {
        lockHbResp.lock();
        List<HeartBeatResp> lstHBR = probeHbRespQueue.get(probeId);
        if (lstHBR == null || lstHBR.size() == 0)
        {
            lockHbResp.unlock();
            return null;
        }
        HeartBeatResp hbr = lstHBR.remove(0);
        lockHbResp.unlock();
        return hbr;
    }
    
    /**
	 * 连接 redis 的 pool
	 */
    private static JedisPool redisPool = null;
    
    /**
	 * 对 task 的有效性进行基本验证
	 * 
	 * @param tt
	 *            测量任务记录
	 * @return 验证成功返回 true，否则返回 false
	 */
    private boolean validTestTask(TestTask tt)
    {
        if (tt == null)
        {
            LOG.warn("valid task - is null");
            return false;
        }
        else if (tt.getProbe() == null || tt.getProbe() <= 0)
        {
            LOG.warn("valid task - probe is null or less than 0");
            return false;
        }
        else if (tt.getTaskId() <= 0)
        {
            LOG.warn("valid task - taskId less than 0");
            return false;
        }
        
        return true;
    }
    
    /**
	 * 获取指定 taskId 的测量任务记录
	 * 
	 * @param taskId
	 *            任务ID
	 * @return 返回查找到的 task，否则返回 null
	 */
    public TestTask getTestTask(long taskId)
    {
        return taskQueue.get(taskId);
    }
    
    /**
	 * 添加 task 至任务队列中
	 * 
	 * @param tt
	 *            任务记录
	 * @return 添加成功返回 true，否则返回 false
	 */
    public boolean putTestTask(TestTask tt)
    {
        if (!validTestTask(tt))
        {
            LOG.warn("tq - put failed, invalid task!");
            return false;
        }
        
        taskQueue.put(tt.getTaskId(), tt);
        if (LOG.isDebugEnabled()) LOG.debug("tq - put task, taskId = " + tt.getTaskId());
        return true;
    }
    
    /**
	 * 从 任务队列 中移除指定任务 taskId 的记录
	 * 
	 * @param taskId
	 *            任务标识
	 * @return 移除成功返回 true，否则返回 false
	 */
    public boolean popTestTask(long taskId)
    {
        if (taskId <= 0)
        {
            LOG.warn("tq - pop failed, taskId <= 0!");
            return false;
        }
        
        taskQueue.remove(taskId);
        return true;
    } 
    
    //--------- init ---------
    
    /**
	 * 以阻塞方式获取指定 new 状态队列 队头的测量任务 主要由 调度控制线程 访问
	 * 
	 * @param probeId
	 *            探针标识
	 * @return 返回获取的队头测量任务，如果异常，则返回 null
	 * @throws InterruptedException
	 *             interrupted
	 */
    public TestTask popInitTestTask() throws InterruptedException
    {
        lockInit.lock();
        try
        {
            if (initTaskQueue.size() == 0)
            {
                TestTask tt = popInitTestTaskFromRedis(0);
                if (tt != null)
                {
                    if (LOG.isDebugEnabled()) LOG.debug("init pop redis - " + tt);
                    return tt;
                }
            }
            
            while (initTaskQueue.size() <= 0)
            {
                condInitHas.await();
            }
            
            TestTask tt = initTaskQueue.remove(0);
            delInitTestTaskFromRedis(0, tt.getTaskId());
            stats.decrInitTaskNum(tt.getProbe());
            if (LOG.isDebugEnabled()) LOG.debug("init pop - " + tt);
            return tt;
        }
        finally
        {
            lockInit.unlock();
        }
    }
    
    /**
	 * 新增 初始状态 的测量任务至队尾
	 * 
	 * @param tt
	 *            测量任务，状态应该为 INIT
	 * @return 新增成功，返回 true，否则，返回 false
	 */
    public boolean putInitTestTask(TestTask tt)
    {
        if (!validTestTask(tt))
        {
            LOG.warn("init - put failed, invalid task!");
            return false;
        }
        else if (tt.getStatus() != TestTaskStatus.S_NEW_TASK)
        {
            LOG.warn("init - put failed, task status is not INIT");
            return false;
        }
        
        lockInit.lock();
        try
        {
            addInitTestTaskFromRedis(tt);
            initTaskQueue.add(tt);
            stats.incrInitTaskNum(tt.getProbe());
            if (LOG.isDebugEnabled()) LOG.debug("init put - " + tt);
            putTestTask(tt);
        }
        finally
        {
            condInitHas.signalAll();
            lockInit.unlock();
        }
        
        return true;
    }
    
    /**
	 * 从 init 任务队列中移除指定的 task
	 * 
	 * @param taskId
	 *            task id
	 * @return 移除成功返回 true，否则返回 false
	 */
    public boolean removeInitTestTask(long taskId)
    {
        lockInit.lock();
        try
        {
            int probeId = -1;
            for (TestTask tt : initTaskQueue)
            {
                if (tt.getTaskId() == taskId)
                {
                    initTaskQueue.remove(tt);
                    probeId = tt.getProbe();
                    stats.incrInitTaskNum(probeId);
                    break;
                }
            }
            //delInitTestTaskFromRedis(probeId, taskId);
            if (LOG.isDebugEnabled()) LOG.debug("init rm - pid:" + probeId + "|tid:" + taskId);
            return true;
        }
        
        finally
        {
            lockInit.unlock();
        }
    }
    
    //--------- wissue ---------
    
    private ReentrantLock getWissueLock(int probeId)
    {
        ReentrantLock lockWissue = null;
        if (!locksWissue.containsKey(probeId))
        {
            lockWissue = new ReentrantLock();
            locksWissue.put(probeId, lockWissue);
            condsWissueHas.put(probeId, lockWissue.newCondition());
        }
        lockWissue = locksWissue.get(probeId);
        return lockWissue;
    }
    
    /**
	 * 获取指定 probe 的一条或多条 wissue 状态的测量任务 最多返回 maxNum 条，且最多阻塞 timeout 秒
	 * 
	 * @param probeId
	 *            探针标识
	 * @param maxNum
	 *            最多返回的 测量任务 数
	 * @param timeout
	 *            阻塞时长，单位：秒，如果为 0，表示一直阻塞到有测量任务返回
	 * @return 返回的测量任务列表
	 */
    public List<TestTask> popWissueTestTask(int probeId, int maxNum, long timeout)
    {
        if (probeId <= 0 || maxNum <= 0 || timeout < 0)
        {
            LOG.warn("wissue - pop failed, invalid param");
            return null;
        }
        
        List<TestTask> tasks = new ArrayList<TestTask>();
        ReentrantLock lockWissue = getWissueLock(probeId);
        lockWissue.lock();
        try
        {
            while (!wissueTaskQueue.containsKey(probeId) 
                    || wissueTaskQueue.get(probeId).size() <= 0)
            {
                Condition condWissueHas = condsWissueHas.get(probeId);
                if (!condWissueHas.await(timeout, TimeUnit.SECONDS))
                {
                    //LOG.info("wissue pop - [" + probeId + "] timeout");
                    return null;
                }
            }
            
            while (maxNum-- > 0)
            {
                TestTask tt = wissueTaskQueue.get(probeId).poll();
                if (tt == null)
                {
                    break;
                }
                stats.decrWissueTaskNum(probeId);
                tasks.add(tt);
            }
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
            return null;
        }
        finally
        {
            lockWissue.unlock();
        }
        
        LOG.info("wissue pop - [" + probeId + "] tnum: " + tasks.size());
        return tasks;
    }
    
    /**
	 * 新增 wissue 状态的 测量任务
	 * 
	 * @param tt
	 *            测量任务
	 * @return 新增成功，返回 true，否则返回 false
	 */
    public boolean putWissueTestTask(TestTask tt)
    {
        if (!validTestTask(tt))
        {
            LOG.warn("wissue - put failed, invalid task!");
            return false;
        }
        else if (tt.getStatus() != TestTaskStatus.S_WISSUE)
        {
            LOG.warn("wissue - put failed, task status is not WISSUE");
            return false;
        }
        
        int probeId = tt.getProbe();
        ReentrantLock lockWissue = getWissueLock(probeId);
        Condition condWissueHas = condsWissueHas.get(probeId);
        lockWissue.lock();
        try
        {
            if (!wissueTaskQueue.containsKey(tt.getProbe()))
            {
                LinkedBlockingDeque<TestTask> lbd = new LinkedBlockingDeque<TestTask>();
                wissueTaskQueue.put(tt.getProbe(), lbd);
            }
            
            wissueTaskQueue.get(tt.getProbe()).put(tt);
            stats.incrWissueTaskNum(tt.getProbe());
            if (LOG.isDebugEnabled()) LOG.debug("wissue put - " + tt);
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
            return false;
        }
        finally
        {
            condWissueHas.signalAll();
            lockWissue.unlock();
        }
        
        return true;
    }
    
    /**
	 * 新增 wissue 状态的 测量任务 至队列的头部
	 * 
	 * @param tt
	 *            测量任务
	 * @return 新增成功，返回 true，否则返回 false
	 */
    public boolean putHeadWissueTestTask(TestTask tt)
    {
        if (!validTestTask(tt))
        {
            LOG.warn("wissue - put failed, invalid task!");
            return false;
        }
        else if (tt.getStatus() != TestTaskStatus.S_WISSUE)
        {
            LOG.warn("wissue - put failed, task status is not WISSUE");
            return false;
        }
        
        int probeId = tt.getProbe();
        ReentrantLock lockWissue = getWissueLock(probeId);
        Condition condWissueHas = condsWissueHas.get(probeId);
        lockWissue.lock();
        try
        {
            if (!wissueTaskQueue.containsKey(tt.getProbe()))
            {
                LinkedBlockingDeque<TestTask> lbd = new LinkedBlockingDeque<TestTask>();
                wissueTaskQueue.put(tt.getProbe(), lbd);
            }
            
            wissueTaskQueue.get(tt.getProbe()).putFirst(tt);
            stats.incrWissueTaskNum(tt.getProbe());
            if (LOG.isDebugEnabled()) LOG.debug("wissue put - " + tt);
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
            return false;
        }
        finally
        {
            condWissueHas.signalAll();
            lockWissue.unlock();
        }
        
        return true;
    }
    
    /**
	 * 从 wissue 任务队列中移除指定的 task
	 * 
	 * @param probeId
	 *            probe id
	 * @param taskId
	 *            task id
	 * @return 移除成功返回 true，否则返回 false
	 */
    public boolean removeWissueTestTask(int probeId, long taskId)
    {
        ReentrantLock lockWissue = getWissueLock(probeId);
        lockWissue.lock();
        try
        {
            if (!wissueTaskQueue.containsKey(probeId))
            {
                return false;
            }
            
            LinkedBlockingDeque<TestTask> tasks = wissueTaskQueue.get(probeId);
            for (TestTask tt : tasks)
            {
                if (tt.getTaskId() == taskId)
                {
                    tasks.remove(tt);
                    stats.decrWissueTaskNum(probeId);
                    break;
                }
            }
            // TODO delWissueTestTaskFromRedis(probeId, taskId);
            if (LOG.isDebugEnabled()) LOG.debug("wissue rm - pid:" + probeId + "|tid:" + taskId);
            return true;
        }
        
        finally
        {
            lockWissue.unlock();
        }
    }
    
    //--------- issued ---------
    
    /**
	 * 获取指定 probe 上 issued 状态的指定 task
	 * 
	 * @param probeId
	 *            探针标识
	 * @param taskId
	 *            任务标识
	 * @return 返回查找到的测量任务，否则返回 null
	 */
    public TestTask popIssuedTestTask(int probeId, long taskId)
    {
        if (probeId <= 0 || taskId <= 0)
        {
            LOG.warn("issued - pop failed, invalid param");
            return null;
        }
        
        lockIssued.lock();
        try
        {
            Map<Long, TestTask> m = issuedTaskQueue.get(probeId);
            if (m != null && m.containsKey(taskId))
            {
                TestTask tt = m.get(taskId);
                m.remove(taskId);
                stats.decrIssuedTaskNum(probeId);
                if (LOG.isDebugEnabled()) LOG.debug("issued pop - " + tt);
                return tt;
            }
        }
        finally
        {
            lockIssued.unlock();
        }
        
        return null;
    }
    
    /**
	 * 新增 issued 状态的测量任务
	 * 
	 * @param tt
	 *            测量任务
	 * @return 新增成功返回 true，否则返回 false
	 */
    public boolean putIssuedTestTask(TestTask tt)
    {
        if (!validTestTask(tt))
        {
            LOG.warn("issued - put failed, invalid task!");
            return false;
        }
        else if (tt.getStatus() != TestTaskStatus.S_ISSUED)
        {
            LOG.warn("issued - put failed, task status is not WISSUE");
            return false;
        }
        
        lockIssued.lock();
        try
        {
            if (!issuedTaskQueue.containsKey(tt.getProbe()))
            {
                Map<Long, TestTask> m = new HashMap<Long, TestTask>();
                issuedTaskQueue.put(tt.getProbe(), m);
            }
            
            issuedTaskQueue.get(tt.getProbe()).put(tt.getTaskId(), tt);
            stats.incrIssuedTaskNum(tt.getProbe());
            if (LOG.isDebugEnabled()) LOG.debug("issued put - " + tt);
            return true;
        }
        finally
        {
            lockIssued.unlock();
        }
    }
    
    /**
	 * 从 issued 任务队列中移除指定的 task
	 * 
	 * @param probeId
	 *            probe id
	 * @param taskId
	 *            task id
	 * @return 移除成功返回 true，否则返回 false
	 */
    public boolean removeIssuedTestTask(int probeId, long taskId)
    {
        lockIssued.lock();
        try
        {
            if (!issuedTaskQueue.containsKey(probeId))
            {
                return false;
            }
            
            Map<Long, TestTask> tasks = issuedTaskQueue.get(probeId);
            tasks.remove(taskId);
            stats.decrIssuedTaskNum(probeId);
            // TODO delWissueTestTaskFromRedis(probeId, taskId);
            if (LOG.isDebugEnabled()) LOG.debug("issued rm - pid:" + probeId + "|tid:" + taskId);
            return true;
        }
        
        finally
        {
            lockIssued.unlock();
        }
    }
    
    /**
	 * 移除该探针下的所有任务并返回
	 */
    public Map<Long, TestTask> popAllIssuedTestTask(int probeId)
    {
        Map<Long, TestTask> tasks = null;
        lockIssued.lock();
        try
        {
            tasks = issuedTaskQueue.remove(probeId);
        }
        finally
        {
            lockIssued.unlock();
        }
        return tasks;
    }
    
    //--------- running ---------
    
    /**
	 * 获取指定 probe 上 running 状态的指定 task
	 * 
	 * @param probeId
	 *            探针标识
	 * @param taskId
	 *            任务标识
	 * @return 返回查找到的测量任务，否则返回 null
	 */
    public TestTask popRunningTestTask(int probeId, long taskId)
    {
        if (probeId <= 0 || taskId <= 0)
        {
            LOG.warn("running - pop failed, invalid param");
            return null;
        }
        
        lockRunning.lock();
        try
        {
            Map<Long, TestTask> m = runningTaskQueue.get(probeId);
            if (m != null && m.containsKey(taskId))
            {
                TestTask tt = m.get(taskId);
                m.remove(taskId);
                stats.decrRunningTaskNum(probeId);
                if (LOG.isDebugEnabled()) LOG.debug("running pop - " + tt);
                return tt;
            }
        }
        finally
        {
            lockRunning.unlock();
        }
        
        return null;
    }
    
    /**
	 * 新增 running 状态的测量任务
	 * 
	 * @param tt
	 *            测量任务
	 * @return 新增成功返回 true，否则返回 false
	 */
    public boolean putRunningTestTask(TestTask tt)
    {
        if (!validTestTask(tt))
        {
            LOG.warn("running - put failed, invalid task!");
            return false;
        }
        else if (tt.getStatus() != TestTaskStatus.S_RUNNING)
        {
            LOG.warn("running - put failed, task status is not RUNNING");
            return false;
        }
        
        lockRunning.lock();
        try
        {
            if (!runningTaskQueue.containsKey(tt.getProbe()))
            {
                Map<Long, TestTask> m = new HashMap<Long, TestTask>();
                runningTaskQueue.put(tt.getProbe(), m);
            }
            
            runningTaskQueue.get(tt.getProbe()).put(tt.getTaskId(), tt);
            stats.incrRunningTaskNum(tt.getProbe());
            if (LOG.isDebugEnabled()) LOG.debug("running put - " + tt);
            return true;
        }
        finally
        {
            lockRunning.unlock();
        }
    }
    
    /**
	 * 从 running 任务队列中移除指定的 task
	 * 
	 * @param probeId
	 *            probe id
	 * @param taskId
	 *            task id
	 * @return 移除成功返回 true，否则返回 false
	 */
    public boolean removeRunningTestTask(int probeId, long taskId)
    {
        lockRunning.lock();
        try
        {
            if (!runningTaskQueue.containsKey(probeId))
            {
                return false;
            }
            
            Map<Long, TestTask> tasks = runningTaskQueue.get(probeId);
            tasks.remove(taskId);
            stats.decrRunningTaskNum(probeId);
            // TODO delWissueTestTaskFromRedis(probeId, taskId);
            if (LOG.isDebugEnabled()) LOG.debug("running rm - pid:" + probeId + "|tid:" + taskId);
            return true;
        }
        
        finally
        {
            lockRunning.unlock();
        }
    }
    
    /**
	 * 获取指定探针下的所有测量任务
	 * 
	 * @param probeId
	 *            探针ID
	 * @return 返回测量任务 map，key 为任务ID
	 */
     public Map<Long, TestTask> popRunningTestTask(int probeId)
     {
         if (probeId <= 0)
         {
             LOG.warn("running - pop failed, invalid probeId");
             return null;
         }
         
         Map<Long, TestTask> m = null;
         lockRunning.lock();
         try
         {
             m = runningTaskQueue.remove(probeId);
         }
         finally
         {
             lockRunning.unlock();
         }
         
         return m;
     }
     
    /**
	 * 移除该探针下的所有 running 状态的任务并返回
	 */
    public Map<Long, TestTask> popAllRunningTestTask(int probeId)
    {
        Map<Long, TestTask> tasks = null;
        lockRunning.lock();
        try
        {
            tasks = runningTaskQueue.remove(probeId);
        }
        finally
        {
            lockRunning.unlock();
        }
        return tasks;
    }
    
    //--------- task exec succ ---------
     
    /**
	 * 获取指定 probe 上 exec succ 状态的指定 task
	 * 
	 * @param probeId
	 *            探针标识
	 * @param taskId
	 *            任务标识
	 * @return 返回查找到的测量任务，否则返回 null
	 */
    public TestTask popExecSuccTestTask(int probeId, long taskId)
    {
        if (probeId <= 0 || taskId <= 0)
        {
            LOG.warn("ExecSucc - pop failed, invalid param");
            return null;
        }
         
        lockExecSucc.lock();
        try
        {
            Map<Long, TestTask> m = execSuccTaskQueue.get(probeId);
            if (m != null && m.containsKey(taskId))
            {
                TestTask tt = m.get(taskId);
                m.remove(taskId);
                stats.decrExecSuccTaskNum(probeId);
                if (LOG.isDebugEnabled()) LOG.debug("ExecSucc pop - " + tt);
                return tt;
            }
        }
        finally
        {
            lockExecSucc.unlock();
        }
        
        return null;
    }
    
     
    /**
	 * 新增 执行成功 状态的测量任务
	 * 
	 * @param tt
	 *            测量任务
	 * @return 新增成功，返回 true，否则返回 false
	 */
    public boolean putExecSuccTestTask(TestTask tt)
    {
        if (!validTestTask(tt))
        {
            LOG.warn("ExecSucc - put failed, invalid task!");
            return false;
        }
        else if (tt.getStatus() != TestTaskStatus.S_EXEC_SUCC)
        {
            LOG.warn("ExecSucc - put failed, task status is error");
            return false;
        }
        
        lockExecSucc.lock();
        try
        {
            if (!execSuccTaskQueue.containsKey(tt.getProbe()))
            {
                Map<Long, TestTask> m = new HashMap<Long, TestTask>();
                execSuccTaskQueue.put(tt.getProbe(), m);
            }
            
            execSuccTaskQueue.get(tt.getProbe()).put(tt.getTaskId(), tt);
            stats.incrExecSuccTaskNum(tt.getProbe());
            if (LOG.isDebugEnabled()) LOG.debug("ExecSucc put - " + tt);
            return true;
        }
        finally
        {
            lockExecSucc.unlock();
        }
    }
    
    /**
	 * 从 execSucc 任务队列中移除指定的 task
	 * 
	 * @param probeId
	 *            probe id
	 * @param taskId
	 *            task id
	 * @return 移除成功返回 true，否则返回 false
	 */
    public boolean removeExecSuccTestTask(int probeId, long taskId)
    {
        lockExecSucc.lock();
        try
        {
            if (!execSuccTaskQueue.containsKey(probeId))
            {
                return false;
            }
            
            Map<Long, TestTask> tasks = execSuccTaskQueue.get(probeId);
            tasks.remove(taskId);
            stats.decrExecSuccTaskNum(probeId);
            // TODO delWissueTestTaskFromRedis(probeId, taskId);
            if (LOG.isDebugEnabled()) LOG.debug("ExecSucc rm - pid:" + probeId + "|tid:" + taskId);
            return true;
        }
        
        finally
        {
            lockExecSucc.unlock();
        }
    }
    
    /**
	 * 获取指定探针下的所有测量任务
	 * 
	 * @param probeId
	 *            探针ID
	 * @return 返回测量任务 map，key 为任务ID
	 */
    public Map<Long, TestTask> popExecSuccTestTask(int probeId)
    {
         if (probeId <= 0)
         {
             LOG.warn("ExecSucc - pop failed, invalid probeId");
             return null;
         }
         
         Map<Long, TestTask> m = null;
         lockExecSucc.lock();
         try
         {
             m = execSuccTaskQueue.remove(probeId);
         }
         finally
         {
             lockExecSucc.unlock();
         }
         
         return m;
    }
    
    /**
	 * 移除该探针下的所有 exec succ 状态的任务并返回
	 */
    public Map<Long, TestTask> popAllExecSuccTestTask(int probeId)
    {
        Map<Long, TestTask> tasks = null;
        lockExecSucc.lock();
        try
        {
            tasks = execSuccTaskQueue.remove(probeId);
        }
        finally
        {
            lockExecSucc.unlock();
        }
        return tasks;
    }
    
    //--------- hangup ----------
    
    /**
	 * 新增 HangUp 状态的任务
	 */
    public boolean putHangUpTestTask(TestTask tt)
    {
        if (!validTestTask(tt))
        {
            LOG.warn("HangUp - put failed, invalid task!");
            return false;
        }
        else if (tt.getStatus() != TestTaskStatus.S_FAILED_TASK_HANGUP)
        {
            LOG.warn("HangUp - put failed, task status is error");
            return false;
        }
        
        lockHangup.lock();
        try
        {
            if (!hangupTaskQueue.containsKey(tt.getProbe()))
            {
                List<TestTask> m = new ArrayList<TestTask>();
                hangupTaskQueue.put(tt.getProbe(), m);
            }
            hangupTaskQueue.get(tt.getProbe()).add(tt);
        }
        finally
        {
            lockHangup.unlock();
        }
        
        return true;
    }
    
    /**
	 * 移除指定探针上处于 HangUp 状态的所有任务列表并返回
	 */
    public List<TestTask> popHangUpTestTask(int probeId)
    {
        if (probeId <= 0)
        {
            LOG.warn("HangUp - pop failed, invalid probeId");
            return null;
        }
        
        List<TestTask> tasks = null;
        lockHangup.lock();
        try
        {
            tasks = hangupTaskQueue.remove(probeId);
        }
        finally
        {
            lockHangup.unlock();
        }
        
        return tasks;
    }
     
    //--------- failed/issue es succ ---------
    
    /**
	 * 获取指定 status 上 probe 的指定 task
	 * 
	 * @param probeId
	 *            探针标识
	 * @param taskId
	 *            任务标识
	 * @param status
	 *            用于在对应的状态队列中查找
	 * @return 返回查找到的 task，否则返回 null
	 */
    public TestTask popSFHTestTask(int probeId, long taskId, int status)
    {
        if (probeId <= 0 || taskId <= 0)
        {
            LOG.warn("SFH - pop failed, invalid param");
            return null;
        }
        else if (status != TestTaskStatus.S_FAILED_TASK_EXEC
                && status != TestTaskStatus.S_ISSUE_ES_SUCC
                && status != TestTaskStatus.S_ISSUE_REDIS_SUCC
                && status != TestTaskStatus.S_FAILED_RESULT_ISSUE
                && status != TestTaskStatus.S_FAILED_ISSUE_REDIS
                && status != TestTaskStatus.S_FAILED_TASK_PARSE
                && status != TestTaskStatus.S_FAILED_TIMEOUT
                && status != TestTaskStatus.S_FAILED_PROBE_LOGOUT
                && status != TestTaskStatus.S_FAILED_TASK_ISSUE)
        {
            LOG.warn("SFH - pop failed, invalid status " + status);
            return null;
        }
        
        TestTask tt = null;
        switch (status)
        {
        case TestTaskStatus.S_FAILED_TASK_EXEC:
        case TestTaskStatus.S_FAILED_RESULT_ISSUE:
        case TestTaskStatus.S_FAILED_ISSUE_REDIS:
        case TestTaskStatus.S_FAILED_TASK_PARSE:
        case TestTaskStatus.S_FAILED_TIMEOUT:
        case TestTaskStatus.S_FAILED_PROBE_LOGOUT:
        case TestTaskStatus.S_FAILED_TASK_ISSUE:
            {
                lockFailed.lock();
                try
                {
                    if (!failedTaskQueue.containsKey(probeId))
                    {
                        return null;
                    }
                    tt = failedTaskQueue.get(probeId).get(taskId);
                }
                finally
                {
                    lockFailed.unlock();
                }
            }
            break;
        
        case TestTaskStatus.S_ISSUE_ES_SUCC:
            {
                lockIssueEsSucc.lock();
                try
                {
                    if (!issueEsSuccTaskQueue.containsKey(probeId))
                    {
                        return null;
                    }
                    tt = issueEsSuccTaskQueue.get(probeId).get(taskId);
                }
                finally
                {
                    lockIssueEsSucc.unlock();
                }
            }
            break;
            
        case TestTaskStatus.S_ISSUE_REDIS_SUCC:
        {
            lockIssueRedisSucc.lock();
            try
            {
                if (!issueRedisSuccTaskQueue.containsKey(probeId))
                {
                    return null;
                }
                tt = issueRedisSuccTaskQueue.get(probeId).get(taskId);
            }
            finally
            {
                lockIssueRedisSucc.unlock();
            }
        }
        break;
        }
        
        if (LOG.isDebugEnabled())
            LOG.debug("SFH pop - status:" + status + "|" + tt);
        return tt;
    }
    
    /**
	 * 新增 failed/hangup 状态的测量任务
	 * 
	 * @param tt
	 *            测量任务
	 * @return 新增成功，返回 true，否则返回 false
	 */
    public boolean putSFHTestTask(TestTask tt)
    {
        if (!validTestTask(tt))
        {
            LOG.warn("SFH - put failed, invalid task!");
            return false;
        }
        else if (tt.getStatus() != TestTaskStatus.S_FAILED_TASK_EXEC
                && tt.getStatus() != TestTaskStatus.S_ISSUE_ES_SUCC
                && tt.getStatus() != TestTaskStatus.S_ISSUE_REDIS_SUCC
                && tt.getStatus() != TestTaskStatus.S_FAILED_RESULT_ISSUE
                && tt.getStatus() != TestTaskStatus.S_FAILED_ISSUE_REDIS
                && tt.getStatus() != TestTaskStatus.S_FAILED_TASK_PARSE
                && tt.getStatus() != TestTaskStatus.S_FAILED_TIMEOUT
                && tt.getStatus() != TestTaskStatus.S_FAILED_PROBE_LOGOUT
                && tt.getStatus() != TestTaskStatus.S_FAILED_TASK_ISSUE)
        {
            LOG.warn("SFH - put failed, task status is error");
            return false;
        }
        
        switch (tt.getStatus())
        {
        case TestTaskStatus.S_FAILED_TASK_EXEC:
        case TestTaskStatus.S_FAILED_RESULT_ISSUE:
        case TestTaskStatus.S_FAILED_ISSUE_REDIS:
        case TestTaskStatus.S_FAILED_TASK_PARSE:
        case TestTaskStatus.S_FAILED_TIMEOUT:
        case TestTaskStatus.S_FAILED_PROBE_LOGOUT:
        case TestTaskStatus.S_FAILED_TASK_ISSUE:
            {
                lockFailed.lock();
                try
                {
                    if (!failedTaskQueue.containsKey(tt.getProbe()))
                    {
                        Map<Long, TestTask> m = new HashMap<Long, TestTask>();
                        failedTaskQueue.put(tt.getProbe(), m);
                    }
                    failedTaskQueue.get(tt.getProbe()).put(tt.getTaskId(), tt);
                }
                finally
                {
                    lockFailed.unlock();
                }
            }
            break;
            
        case TestTaskStatus.S_ISSUE_ES_SUCC:
            {
                lockIssueEsSucc.lock();
                try
                {
                    if (!issueEsSuccTaskQueue.containsKey(tt.getProbe()))
                    {
                        Map<Long, TestTask> m = new HashMap<Long, TestTask>();
                        issueEsSuccTaskQueue.put(tt.getProbe(), m);
                    }
                    issueEsSuccTaskQueue.get(tt.getProbe()).put(tt.getTaskId(), tt);
                }
                finally
                {
                    lockIssueEsSucc.unlock();
                }
            }
            break;
            
        case TestTaskStatus.S_ISSUE_REDIS_SUCC:
        {
            lockIssueRedisSucc.lock();
            try
            {
                if (!issueRedisSuccTaskQueue.containsKey(tt.getProbe()))
                {
                    Map<Long, TestTask> m = new HashMap<Long, TestTask>();
                    issueRedisSuccTaskQueue.put(tt.getProbe(), m);
                }
                issueRedisSuccTaskQueue.get(tt.getProbe()).put(tt.getTaskId(), tt);
            }
            finally
            {
                lockIssueRedisSucc.unlock();
            }
        }
        break;
        }
        
        if (LOG.isDebugEnabled()) LOG.debug("SFH put - " + tt);
        return true;
    }
    
    /**
	 * 从 issued 任务队列中移除指定的 task
	 * 
	 * @param probeId
	 *            probe id
	 * @param taskId
	 *            task id
	 * @return 移除成功返回 true，否则返回 false
	 */
    public boolean removeSFHTestTask(int probeId, long taskId, int status)
    {
        switch (status)
        {
        case TestTaskStatus.S_FAILED_TASK_EXEC:
        case TestTaskStatus.S_FAILED_RESULT_ISSUE:
        case TestTaskStatus.S_FAILED_ISSUE_REDIS:
        case TestTaskStatus.S_FAILED_TASK_PARSE:
        case TestTaskStatus.S_FAILED_TIMEOUT:
        case TestTaskStatus.S_FAILED_PROBE_LOGOUT:
        case TestTaskStatus.S_FAILED_TASK_ISSUE:
            {
                lockFailed.lock();
                try
                {
                    if (!failedTaskQueue.containsKey(probeId))
                    {
                        return false;
                    }
                    failedTaskQueue.get(probeId).remove(taskId);
                }
                finally
                {
                    lockFailed.unlock();
                }
            }
            break;
        
        case TestTaskStatus.S_ISSUE_ES_SUCC:
            {
                lockIssueEsSucc.lock();
                try
                {
                    if (!issueEsSuccTaskQueue.containsKey(probeId))
                    {
                        return false;
                    }
                    issueEsSuccTaskQueue.get(probeId).remove(taskId);
                }
                finally
                {
                    lockIssueEsSucc.unlock();
                }
            }
            break;
        }
        
        if (LOG.isDebugEnabled())
            LOG.debug("SFH remove - status:" + status 
                    + "|probe:" + probeId + "|task:" + taskId);
        return true;
    }
    
    //--------- test task status ---------
    
    /**
	 * 阻塞方式 获取一条或多条 测量任务状态 记录
	 * 
	 * @param maxNum
	 *            最大数目
	 * @return 测量任务状态列表
	 */
    public List<ProbeTaskStatus> popTestTaskStatuses(int maxNum)
    {
        if (maxNum <= 0)
        {
            return null;
        }
        
        List<ProbeTaskStatus> ptss = new ArrayList<ProbeTaskStatus>();
        lockTSQ.lock();
        try
        {
            while (taskStatusQueue.size() <= 0)
            {
                condTSQ.await();
            }
            
            while (maxNum-- > 0 && taskStatusQueue.size() > 0)
            {
                ProbeTaskStatus pts = taskStatusQueue.remove(0);
                if (pts != null)
                {
                    ptss.add(pts);
                }
            }
        }
        catch (InterruptedException e)
        {
            LOG.warn("pop task status - " + e.getMessage());
            return null;
        }
        finally
        {
            lockTSQ.unlock();
        }
        
        return ptss;
    }
    
    /**
	 * 新增测量任务状态记录
	 * 
	 * @param pts
	 *            探针的测量任务状态
	 */
    public boolean putTestTaskStatus(ProbeTaskStatus pts)
    {
        if (pts == null)
        {
            return false;
        }
        
        if (TestTaskSchedulerRuler.isEndStatus(pts.getStatus()))
        {
            int finalStatus = TestTaskSchedulerRuler.testTaskFinal(pts.getTaskId());
            if (finalStatus >= 0)
            {
                pts.setFinalStatus(finalStatus);
            }
        }
        
        lockTSQ.lock();
        try
        {
            taskStatusQueue.add(pts);
            if (LOG.isDebugEnabled()) LOG.debug("TTS - put " + pts);
        }
        finally
        {
            condTSQ.signal();
            lockTSQ.unlock();
        }
        
        return true;
    }
    
    /**
	 * 获取指定任务的状态信息
	 * 
	 * @param taskId
	 *            任务ID
	 * @return 如果存在，则返回 status 记录，否则返回 null
	 */
    public ProbeTaskStatus getTestTaskStatus(long taskId)
    {
        for (ProbeTaskStatus pts : taskStatusQueue)
        {
            if (pts.getTaskId() == taskId)
            {
                return pts;
            }
        }
        
        return null;
    }
    
    //------------------------
    
    /**
	 * 更新任务状态
	 */
    public boolean updateTestTaskStatus(TestTask tt, int newStatus)
    {
        tt.setLastStatus(tt.getStatus());
        tt.setStatus(newStatus);
        if (TestTaskSchedulerRuler.isEndStatus(newStatus))
        {
            updateTestTaskLastEndTime(tt);
        }
        
        updateTestTaskStatusFromRedis(tt.getTaskId(), newStatus);
        return true;
    }
    
    /**
	 * 更新任务计数器
	 */
    public void updateTestTaskExecNum(TestTask tt)
    {
        tt.setExecNum(tt.getExecNum() + 1);
        incrTestTaskExecNumFromRedis(tt.getTaskId());
    }
    
    /**
	 * 更新上一次任务执行完的时间
	 */
    public void updateTestTaskLastEndTime(TestTask tt)
    {
        tt.setLastEndTime(new Date());
        updateTestTaskLastEndTimeFromRedis(tt);
    }
    
    /**
	 * 在任务执行完成后从内存及 redis 中清除
	 */
    public void removeTestTask(TestTask tt)
    {
        switch (tt.getStatus())
        {
        case TestTaskStatus.S_NEW_TASK:
            removeInitTestTask(tt.getTaskId());
            break;
        
        case TestTaskStatus.S_WISSUE:
            removeWissueTestTask(tt.getProbe(), tt.getTaskId());
            break;
            
        case TestTaskStatus.S_ISSUED:
            removeIssuedTestTask(tt.getProbe(), tt.getTaskId());
            break;
            
        case TestTaskStatus.S_RUNNING:
            removeRunningTestTask(tt.getProbe(), tt.getTaskId());
            break;
            
        case TestTaskStatus.S_EXEC_SUCC:
        case TestTaskStatus.S_FAILED_TASK_EXEC:
        case TestTaskStatus.S_FAILED_TASK_HANGUP:
            removeSFHTestTask(tt.getProbe(), tt.getTaskId(), tt.getStatus());
            break;
        }
        
        popTestTask(tt.getTaskId());
        removeTestTaskFromRedis(tt.getTaskId());
    }
    
    //--------------- redis -------------
    
    private static final String REDIS_RESP_OK = "OK";
    
    /**
	 * 从 redis 获取 Init 状态队列顶部的 task
	 * 
	 * @param probeId
	 *            探针标识
	 * @return 返回获取到的顶部 task
	 */
    private TestTask popInitTestTaskFromRedis(int probeId)
    {
        if (probeId < 0)
        {
            return null;
        }
        
        if (redisEnable)
        {
            Jedis jedis = null;
            try
            {
                jedis = redisPool.getResource();
                if (jedis != null)
                {
                    String keySP = String.format(
                            SchedulerRedisKey.TESTASK_STATUS_PROBE, TestTaskStatus.S_NEW_TASK, probeId);
                    String taskId = jedis.lpop(keySP);
                    final Gson gson = new GsonBuilder()
                        .setDateFormat("yyyy-MM-dd HH:mm:ss").create();
                    if (taskId != null)
                    {
                        int tid = Integer.valueOf(taskId);
                        
                        // get task record with tid
                        String jsonTask = jedis.get(String.format(SchedulerRedisKey.TESTASK, tid));
                        if (jsonTask != null && jsonTask.length() > 0)
                        {
                            TestTask tt = gson.fromJson(jsonTask, TestTask.class);
                            return tt;
                        }
                    }
                }
            }
            catch (JedisConnectionException jce)
            {
                LOG.warn("not connect redis - " + jce.getMessage());
                redisPool.returnBrokenResource(jedis);
            }
            catch (Exception e)
            {
                redisPool.returnBrokenResource(jedis);
            }
            finally
            {
                redisPool.returnResource(jedis);
            }
        }
        
        return null;
    }
    
    /**
	 * 从探针的 init 状态的 任务列表中删除指定的任务
	 * 
	 * @param probeId
	 *            探针标识
	 * @param taskId
	 *            任务标识
	 * @return 删除成功返回 true，否则，返回 false
	 */
    private boolean delInitTestTaskFromRedis(int probeId, long taskId)
    {
        if (probeId < 0 || taskId <= 0)
        {
            LOG.warn("init del redis - invalid param.");
            return false;
        }
        
        if (redisEnable)
        {
            Jedis jedis = null;
            try
            {
                jedis = redisPool.getResource();
                if (jedis != null)
                {
                    String keySP = String.format(
                            SchedulerRedisKey.TESTASK_STATUS_PROBE, TestTaskStatus.S_NEW_TASK, probeId);
                    long cnt = jedis.lrem(keySP, 0, String.valueOf(taskId));
                    if (LOG.isDebugEnabled())
                        LOG.debug("init del redis - pid:" + probeId + "|tid:" + taskId + "|cnt:" + cnt);
                    return true;
                }
            }
            catch (JedisConnectionException jce)
            {
                LOG.warn("not connect redis - " + jce.getMessage());
                redisPool.returnBrokenResource(jedis);
            }
            catch (Exception e)
            {
                redisPool.returnBrokenResource(jedis);
            }
            finally
            {
                redisPool.returnResource(jedis);
            }
        }
        
        return false;
    }
    
    /**
	 * 新增测量任务至 redis，需要同时添加任务记录 和 init 状态记录
	 * 
	 * @param tt
	 *            测量任务
	 * @return 新增成功，返回 true
	 */
    private boolean addInitTestTaskFromRedis(TestTask tt)
    {
        if (!validTestTask(tt))
        {
            LOG.warn("init redis - add failed, invalid task!");
            return false;
        }
        else if (tt.getStatus() != TestTaskStatus.S_NEW_TASK)
        {
            LOG.warn("init redis - add failed, task status is not INIT");
            return false;
        }
        
        if (redisEnable)
        {
            Jedis jedis = null;
            try
            {
                jedis = redisPool.getResource();
                if (jedis != null)
                {
                    String keyTask = String.format(SchedulerRedisKey.TESTASK, tt.getTaskId());
                    Gson gson = new GsonBuilder()
                                    .excludeFieldsWithoutExposeAnnotation()
                                    .setDateFormat("yyyy-MM-dd HH:mm:ss").create();
                    if (!jedis.set(keyTask, gson.toJson(tt)).equals(REDIS_RESP_OK))
                    {
                        LOG.warn("init redis - add [" + keyTask + "] failed!");
                        return false;
                    }
                    else if (LOG.isDebugEnabled())
                    {
                        LOG.debug("init redis - add [" + keyTask + "] succ.");
                    }
                    
                    String keySP = String.format(SchedulerRedisKey.TESTASK_STATUS_PROBE, 
                            TestTaskStatus.S_NEW_TASK, tt.getProbe());
                    long num = jedis.rpush(keySP, String.valueOf(tt.getTaskId()));
                    if (LOG.isDebugEnabled())
                        LOG.debug("init redis - add [" + keySP + "] num " + num);
                    
                    return true;
                }
            }
            catch (JedisConnectionException jce)
            {
                LOG.warn("not connect redis - " + jce.getMessage());
                redisPool.returnBrokenResource(jedis);
            }
            catch (Exception e)
            {
                redisPool.returnBrokenResource(jedis);
            }
            finally
            {
                redisPool.returnResource(jedis);
            }
        }
        
        return true;
    }
    
    /**
	 * 更新任务状态记录
	 * 
	 * @param taskId
	 *            任务标识
	 * @param newStatus
	 *            新状态
	 * @return 更新成功返回 true
	 */
    private boolean updateTestTaskStatusFromRedis(long taskId, int newStatus)
    {
        if (redisEnable)
        {
            Jedis jedis = null;
            try
            {
                jedis = redisPool.getResource();
                if (jedis != null)
                {
                    String keyTS = String.format(
                            SchedulerRedisKey.TESTASK_TASK_STATUS, taskId);
                    jedis.set(keyTS, String.valueOf(newStatus));
                    if (LOG.isDebugEnabled())
                        LOG.debug("upd TTS redis - add [" + keyTS + "]");
                    return true;
                }
            }
            catch (JedisConnectionException jce)
            {
                LOG.warn("not connect redis - " + jce.getMessage());
                redisPool.returnBrokenResource(jedis);
            }
            catch (Exception e)
            {
                redisPool.returnBrokenResource(jedis);
            }
            finally
            {
                redisPool.returnResource(jedis);
            }
        }
        
        return false;
    }
    
    /**
	 * 更新任务的最后一次执行时间 使用 tt.getLastExecTime()
	 */
    private void updateTestTaskLastEndTimeFromRedis(TestTask tt)
    {
        if (redisEnable)
        {
            Jedis jedis = null;
            try
            {
                jedis = redisPool.getResource();
                if (jedis != null)
                {
                    String keyTLE = String.format(SchedulerRedisKey.TESTASK_TASK_LASTIME, tt.getTaskId());
                    jedis.set(keyTLE, String.valueOf(tt.getLastEndTime().getTime()));
                    if (LOG.isDebugEnabled())
                        LOG.debug("upd TTLET redis - [" + keyTLE + "]");
                }
            }
            catch (JedisConnectionException jce)
            {
                LOG.warn("not connect redis - " + jce.getMessage());
                redisPool.returnBrokenResource(jedis);
            }
            catch (Exception e)
            {
                redisPool.returnBrokenResource(jedis);
            }
            finally
            {
                redisPool.returnResource(jedis);
            }
        }
    }
    
    /**
	 * 递增指定任务的执行次数
	 * 
	 * @param taskId
	 *            测量任务标识
	 */
    private void incrTestTaskExecNumFromRedis(long taskId)
    {
        if (redisEnable)
        {
            Jedis jedis = null;
            try
            {
                jedis = redisPool.getResource();
                if (jedis != null)
                {
                    String keyTEC = String.format(
                            SchedulerRedisKey.TESTASK_TASK_EXECNT, taskId);
                    long num = jedis.incr(keyTEC);
                    if (LOG.isDebugEnabled())
                        LOG.debug("incr TEC redis - " + keyTEC + "|num:" + num);
                }
            }
            catch (JedisConnectionException jce)
            {
                LOG.warn("not connect redis - " + jce.getMessage());
                redisPool.returnBrokenResource(jedis);
            }
            catch (Exception e)
            {
                redisPool.returnBrokenResource(jedis);
            }
            finally
            {
                redisPool.returnResource(jedis);
            }
        }
    }
    
    /**
	 * 从 redis 中移除所有和指定任务相关的存储，包括：
	 * 
	 * - 任务信息 - 任务当前状态 - 任务计数器 - 任务上一次执行时间
	 * 
	 * @param taskId
	 *            任务标识
	 */
    private void removeTestTaskFromRedis(long taskId)
    {
        if (redisEnable)
        {
            Jedis jedis = null;
            try
            {
                jedis = redisPool.getResource();
                if (jedis != null)
                {
                    String keyTask = String.format(SchedulerRedisKey.TESTASK, taskId);
                    jedis.del(keyTask);
                    
                    String keyTS = String.format(SchedulerRedisKey.TESTASK_TASK_STATUS, taskId);
                    jedis.del(keyTS);
                    
                    String keyTEC = String.format(SchedulerRedisKey.TESTASK_TASK_EXECNT, taskId);
                    jedis.del(keyTEC);
                    
                    String keyTLE = String.format(SchedulerRedisKey.TESTASK_TASK_LASTIME, taskId);
                    jedis.del(keyTLE);
                    
                    if (LOG.isDebugEnabled())
                        LOG.debug("rem redis - " + keyTask + "|" + keyTS + "|" + keyTEC + "|" + keyTLE);
                }
            }
            catch (JedisConnectionException jce)
            {
                LOG.warn("not connect redis - " + jce.getMessage());
                redisPool.returnBrokenResource(jedis);
            }
            catch (Exception e)
            {
                redisPool.returnBrokenResource(jedis);
            }
            finally
            {
                redisPool.returnResource(jedis);
            }
        }
    }
    
    //-----------------
    
    /**
	 * 将指定探针的任务状态从 hangup 切换回上一个状态
	 */
    public void switchTaskFromHangUp(int probeId)
    {
        List<TestTask> tasks = popHangUpTestTask(probeId);
        if (tasks != null && tasks.size() > 0)
        {
            for (TestTask tt : tasks)
            {
                if (tt != null)
                {
                    switch (tt.getLastStatus())
                    {
                    case TestTaskStatus.S_ISSUED:
                        updateTestTaskStatus(tt, tt.getLastStatus());
                        putIssuedTestTask(tt);
                        break;
                        
                    case TestTaskStatus.S_RUNNING:
                        updateTestTaskStatus(tt, tt.getLastStatus());
                        putRunningTestTask(tt);
                        break;
                        
                    case TestTaskStatus.S_EXEC_SUCC:
                        updateTestTaskStatus(tt, tt.getLastStatus());
                        putExecSuccTestTask(tt);
                        break;
                    }
                }
            }
        }
    }
    
    /**
	 * 将指定探针的任务状态切换为 hangup 状态
	 */
    public void switchTaskToHangUp(int probeId)
    {
        // issued
        Map<Long, TestTask> tasks = popAllIssuedTestTask(probeId);
        if (tasks != null && tasks.size() > 0)
        {
            for (TestTask tt : tasks.values())
            {
                if (tt != null && tt.getStatus() == TestTaskStatus.S_ISSUED)
                {
                    updateTestTaskStatus(tt, TestTaskStatus.S_FAILED_TASK_HANGUP);
                    putHangUpTestTask(tt);
                }
            }
        }
        
        // running
        tasks = popAllRunningTestTask(probeId);
        if (tasks != null && tasks.size() > 0)
        {
            for (TestTask tt : tasks.values())
            {
                if (tt != null && tt.getStatus() == TestTaskStatus.S_RUNNING)
                {
                    updateTestTaskStatus(tt, TestTaskStatus.S_FAILED_TASK_HANGUP);
                    putHangUpTestTask(tt);
                }
            }
        }
        
        // exec succ
        tasks = popAllExecSuccTestTask(probeId);
        if (tasks != null && tasks.size() > 0)
        {
            for (TestTask tt : tasks.values())
            {
                if (tt != null && tt.getStatus() == TestTaskStatus.S_EXEC_SUCC)
                {
                    updateTestTaskStatus(tt, TestTaskStatus.S_FAILED_TASK_HANGUP);
                    putHangUpTestTask(tt);
                }
            }
        }
    }
    
    /**
	 * 注意：清空所有任务 当前仅用于 单元测试 用
	 */
    public synchronized void clear()
    {
        taskQueue.clear();
        
        if (redisEnable)
        {
            Jedis jedis = null;
            try
            {
                jedis = redisPool.getResource();
                jedis.flushDB();
            }
            catch (JedisConnectionException jcex)
            {
                LOG.error("updateTestTask - connect redis exception: " + jcex.getMessage());
                redisPool.returnBrokenResource(jedis);
            }
            catch (Exception e)
            {
                redisPool.returnBrokenResource(jedis);
            }
            finally
            {
                redisPool.returnResource(jedis);
            } 
        }
    }
}
